--
-- @Author:      name
-- @DateTime:    2018-03-30 23:05:48
-- @Description: 消息的处理
require "skynet.manager"
local skynet = require "skynet"
local mysql = require "skynet.db.mysql"
local log = require "Logger"
local MessageHandler = class("MessageHandler")



local maxconn = 2 --db连接数
local index = 2
---------------------------------------------------------
-- Private
---------------------------------------------------------
function MessageHandler:ctor(message_dispatch)
	self.message_dispatch = message_dispatch
	self.pool = {} --连接池
	self.maxconn = 2 --db连接数
	self.connect_args = nil --连接参数

	self:register()
end

--注册本服务里的消息
function MessageHandler:register()
	local tbMessage = {
		"start",
		"execute",
	}
	for k,v in pairs(tbMessage) do 
		self.message_dispatch:registerSelf(v, handler(self,self[v]))
	end
end

function MessageHandler:getconnect(async)
	local db
	if async then
		db = self.pool[1]
	else
		db = self.pool[index]
		assert(db, "_____数据库连接对象错误")
		index = index + 1
		if index > self.maxconn then
			index = 2
		end
	end
	return db
end

--正常情况下，每5分钟运行一次保活
--连接断开情况下，每1秒钟运行一次保活，并且mysql driver底层会进行重连
function MessageHandler:heartbeat()
	local function queryFunc(db_service)
		return db_service:query('select 1')   
	end	
    skynet.fork(function ()
        while true do
        	skynet.sleep(100*5) --100=1秒
            for k,db in pairs(self.pool) do
            	local status, res = pcall(queryFunc, db)  
            	if not status then --发送消息失败 socketchannel 会自动重连
            		print("###发送数据到mysql失败, 自动重连。。。")
            	end
            end            
        end
    end)
end


---------------------------------------------------------
-- CMD
---------------------------------------------------------

function MessageHandler:start(args)
	self.maxconn = tonumber(skynet.getenv("mysql_maxconn")) or maxconn
	assert(self.maxconn >= 2)
	local args = args
	if not args then 
		args = {
			host = skynet.getenv("mysql_host"),
			port = tonumber(skynet.getenv("mysql_port")),				  
			user = skynet.getenv("mysql_user"),
			password = skynet.getenv("mysql_pwd"),
			database = skynet.getenv("mysql_db"),
		}
	end
	self.connect_args = {
		host = args.host,
		port = args.port,
		database = args.database,
		user = args.user,
		password = args.password,	
		max_packet_size = 1024 * 1024,
	}

	for i = 1, self.maxconn do
		local db_service = mysql.connect(self.connect_args )
		if db_service then
			table.insert(self.pool, db_service)
		else
			skynet.error("mysql connect error")
		end		
	end
	self:heartbeat()
end

-- async 不着急的写操作
-- 写操作取连接池中的第一个连接进行操作
function MessageHandler:execute(sql, async)
	local db = self:getconnect(async)
	--print("____________sql___",sql)
    local ok,result = x_pcall(function(sql, ...)
    	local result = db:query(sql)
    	if result.errno or result.err then
    		log.error(' DB##############'..' Have'..' errno:'..tostring(result.errno)..' errmsg:'..tostring(result.err)..' error sql: '..sql)
    		return nil
    	end
    	return result
    end, sql)
    if not ok then
        log.error('[ DB##############'..' not ok! result: ', result, ' error sql: ', sql)
        return nil
    end	
	return result
end

function MessageHandler:stop()
	for _, db in pairs(self.pool) do
		db:disconnect()
	end
	self.pool = {}
end
-----------------列表end

return MessageHandler

